use std::sync::Arc;

use polars_error::PolarsResult;
use polars_io::pl_async;
use polars_plan::dsl::sink2::FileProviderArgs;

use crate::async_executor;
use crate::async_primitives::connector;
use crate::nodes::TaskPriority;
use crate::nodes::io_sinks2::components::file_provider::FileProvider;
use crate::nodes::io_sinks2::components::file_sink::{FileSinkPermit, FileSinkTaskData};
use crate::nodes::io_sinks2::components::size::RowCountAndSize;
use crate::nodes::io_sinks2::writers::interface::FileWriterStarter;
use crate::utils::task_handles_ext;

#[derive(Clone)]
pub struct PartitionSinkStarter {
    pub file_provider: Arc<FileProvider>,
    pub writer_starter: Arc<dyn FileWriterStarter>,
}

impl PartitionSinkStarter {
    pub fn start_sink(
        &self,
        file_provider_args: FileProviderArgs,
        start_position: RowCountAndSize,
        file_permit: FileSinkPermit,
    ) -> PolarsResult<FileSinkTaskData> {
        let file_provider = Arc::clone(&self.file_provider);
        let file_open_task = task_handles_ext::AbortOnDropHandle(
            pl_async::get_runtime()
                .spawn(async move { file_provider.open_file(file_provider_args).await }),
        );

        let (morsel_tx, morsel_rx) = connector::connector();

        let writer_handle = self
            .writer_starter
            .start_file_writer(morsel_rx, file_open_task)?;

        let task_handle = async_executor::spawn(TaskPriority::High, async move {
            writer_handle.await?;
            Ok(file_permit)
        });

        Ok(FileSinkTaskData {
            morsel_tx,
            start_position,
            task_handle,
        })
    }
}
